package com.carol.bigdata.task.feature


import com.carol.bigdata.Config
import com.carol.bigdata.constant.KVConstant
import com.carol.bigdata.utils.HBaseUtil
import com.carol.bigdata.utils.{Flag, IpTool, TimeUtil}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import com.carol.bigdata.utils.FeatureUtil
import com.carol.bigdata.utils.RddReader
import org.slf4j.{Logger, LoggerFactory}


object CalEnvTag {

    val LOG: Logger = LoggerFactory.getLogger(CalEnvTag.getClass)
    // 基本字段
    val keyColumns: List[String] = KVConstant.keyColumns
    val keyLen: Int = keyColumns.length
    val cf: String = KVConstant.cf
    val propertiesField: String = "properties"

    // 数据源
    // 登录表
    val loginGameTable: String = KVConstant.actionTable
    val loginGameColumns: List[String] = keyColumns :+ propertiesField
    val loginGamePattern: String = KVConstant.loginPattern
    val loginPropIdx: Int = loginGameColumns.indexOf(propertiesField)
    //val loginPropColumns: List[String] = List("ip", "device_name", "sim")
    val loginPropColumns: List[String] = List("country", "province", "city").map("$" + _) ++ List("device_type", "os")
    val devIdx: Int = loginPropColumns.indexOf("device_type")
    val cityIdx: Int = loginPropColumns.indexOf("$city")
    val addressLength: Int = cityIdx + 1

    // 游戏日更中间表
    // 用户数据统计结果表
    val userProfileTable: String = KVConstant.userProfileTable
    val keyCF: String = KVConstant.keyCF
    val keyCols: List[String] = KVConstant.dateKeyColumns
    val writeKeyCols: List[String] = KVConstant.writeKeyCols
    val event: String = "profile"

    // 用户环境特征字段 env_tag
    val envMiddleCF = "env_tag"
    val envFeat: List[(String, String)] = List(
        ("$address", "map"),
        ("$device", "map"),
        //("$sim", "map"),
        ("$os", "map")
    )

    def calEnvInfo(statDay: String,
                   loginUsers: RDD[List[String]],
                   featList: List[(String, String)],
                   ipPath: String): RDD[(List[String], List[String])] = {
        // loginUsers: List(gameId,accountId,loginIP,loginDevice,loginSIM)
        //println(s"loginUsers: ${loginUsers.count()}")
        //loginUsers.take(5).foreach(println)
        lazy val ip2Address = IpTool.getSearcher(ipPath)
        val envInfoRDD = loginUsers.map(x => (x.take(keyLen), x.takeRight(x.length - keyLen).map(List(_))))
          // => List(gameId,accountId), List(loginIP,loginDevice,loginSIM)
          .reduceByKey((a, b) => a.zip(b).map(i => i._1 ++ i._2))
          // => List(gameId,accountId), List(maxLoginIP,maxLoginDevice,maxLoginSIM,maxLoginOS)
          .map(x => {
              val address: List[String] = x._2.head.map(i => {
                  val addressInfoMap = IpTool.getCityInfo(ip2Address, i)
                  val country: String = addressInfoMap.getOrDefault("country", "0")
                  val province: String = addressInfoMap.getOrDefault("province", "0")
                  val city: String = addressInfoMap.getOrDefault("city", "0")
                  country + "_" + province + "_" + city
              })

              val tmp = address +: x._2.tail :+ x._2(devIdx).map(i => if (i == "iphone") "ios" else "android")
              val res = FeatureUtil.calFeat(tmp, featList)
              (statDay +: x._1, res)
          })

        println(s"envInfoRDD: ${envInfoRDD.count()}")
        println(s"feat: ${featList}")
        envInfoRDD.take(5).foreach(println)
        envInfoRDD
    }

    def calEnvInfoWithoutIp(statDay: String,
                            loginUsers: RDD[List[String]],
                            featList: List[(String, String)]): RDD[(List[String], List[String])] = {
        // loginUsers: List(gameId,accountId,country,province,city,device_type,os)
        // println(s"loginUsers: ${loginUsers.count()}")
        val envInfoRDD = loginUsers
          // => List(gameId,accountId), List(List(address),List(loginDevice),List(os))
          .map(x => {
              val keys = x.take(keyLen)
              val address = x.slice(keyLen, keyLen + addressLength).mkString("_")
              (keys, (address +: x.takeRight(2)).map(List(_)))
          })
          .reduceByKey((a, b) => a.zip(b).map(i => i._1 ++ i._2))
          // => List(gameId,accountId), List(maxLoginAddress,maxLoginDevice,maxLoginOS)
          .map(x => {
              val res = FeatureUtil.calFeat(x._2, featList)
              (statDay +: x._1, res)
          })
        LOG.debug(s"envInfoRDD: ${envInfoRDD.count()}, feat: ${featList}")
        envInfoRDD.take(5).foreach(println)
        envInfoRDD
    }


    def run(hbaseParams: Map[String, String],
            spark: SparkSession,
            statDay: String,
            game: String): Unit = {
        println("计算环境特征...")
        val yesterday: String = TimeUtil.getTimeDeltaDay(statDay, -1)
        val yesterdayRowKey = for (date <- List(yesterday)) yield FeatureUtil.getDataRowKey(game, event, date)
        // 1、读取登录日志
        val loginUsers = RddReader.readHiveRDD(spark, loginGameTable, statDay, loginGamePattern, game, keyColumns, loginPropColumns)
        println("loginUsers:", loginUsers.count())
        loginUsers.take(5).foreach(println)

        // 2、计算
        // List(time,game_id,uid),List(maxLoginAddress,maxLoginDevice,maxLoginOS)
        //val envRDD: RDD[(List[String], List[String])] = calEnvInfo(statDay, loginUsers, envFeat, ipPath)
        val envRDD = calEnvInfoWithoutIp(statDay, loginUsers, envFeat)
        println("envRDD:", envRDD.count())
        envRDD.take(5).foreach(println)

        // List(game_id,event,time,uid),List(maxLoginAddress,maxLoginDevice,maxLoginOS)
        val writeEnvInfoRDD = FeatureUtil.changeProfileRDD(envRDD)
        println("writeEnvInfoRDD:", writeEnvInfoRDD.count())
        writeEnvInfoRDD.take(5).foreach(println)
        // 3、写入hbase中间表
        val envFeatColumns = envFeat.map(_._1)
        HBaseUtil.writeRDD2KVColumn(hbaseParams, userProfileTable, writeEnvInfoRDD, keyCF, writeKeyCols, envFeatColumns,
            List.fill(envFeatColumns.length)(envMiddleCF))

        // 4、计算total
        val List(_, totalMapRDD) = FeatureUtil.calTotal(hbaseParams, spark, statDay, envRDD, userProfileTable,
            List(keyCF, envMiddleCF), keyCols, envFeat, yesterdayRowKey)
        // List(game_id,event,time,uid),List(maxLoginIP,maxLoginDevice,maxLoginSIM,maxLoginOS)
        val writeTotalMapRDD = FeatureUtil.changeProfileRDD(totalMapRDD)

        // 5、写入total
        val totalColumns = envFeatColumns.map(_ + "_total")
        HBaseUtil.writeRDD2KVColumn(hbaseParams, userProfileTable, writeTotalMapRDD, keyCF, writeKeyCols, totalColumns,
            List.fill(totalColumns.length)(envMiddleCF))
    }


    def main(args: Array[String]): Unit = {
        Flag.Parse(args)
        val spark = SparkSession.builder()
          .config(conf = Config.sparkConf)
          .enableHiveSupport()
          .getOrCreate()
        // hive地址相关配置需在SparkSession创建后再进行配置，否则可能因为各个配置文件会将地址覆盖掉导致进程停滞
        // spark.sparkContext.getConf
        //    .set("hive.metastore.uris", Config.HiveMetaStoreUris)
        //    .set("spark.sql.warehouse.dir", Config.sparkWarehouseDir)

        val sc = spark.sparkContext
        sc.setLogLevel("ERROR")
        val hbaseParams: Map[String, String] = sc.broadcast(Config.hbaseParams).value
        val gameList = List("5")
        for (day <- 1 to 1) {
            val statDay = "2021-08-%02d".format(day)
            for (game <- gameList) {
                run(hbaseParams, spark, statDay, game)
            }
        }
        spark.stop()
    }

}
